#!/usr/bin/env python3
# group: rw
#
# Tests for internal snapshot.
#
# Copyright (C) 2013 IBM, Inc.
#
# Based on 055.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import time
import os
import iotests
from iotests import qemu_img, qemu_io

test_drv_base_name = 'drive'

class ImageSnapshotTestCase(iotests.QMPTestCase):
    image_len = 120 * 1024 * 1024 # MB

    def __init__(self, *args):
        self.expect = []
        super(ImageSnapshotTestCase, self).__init__(*args)

    def _setUp(self, test_img_base_name, image_num):
        self.vm = iotests.VM()
        for i in range(0, image_num):
            filename = '%s%d' % (test_img_base_name, i)
            img = os.path.join(iotests.test_dir, filename)
            device = '%s%d' % (test_drv_base_name, i)
            qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len))
            self.vm.add_drive(img)
            self.expect.append({'image': img, 'device': device,
                                'snapshots': [],
                                'snapshots_name_counter': 0})
        self.vm.launch()

    def tearDown(self):
        self.vm.shutdown()
        for dev_expect in self.expect:
            os.remove(dev_expect['image'])

    def createSnapshotInTransaction(self, snapshot_num, abort = False):
        actions = []
        for dev_expect in self.expect:
            num = dev_expect['snapshots_name_counter']
            for j in range(0, snapshot_num):
                name = '%s_sn%d' % (dev_expect['device'], num)
                num = num + 1
                if abort == False:
                    dev_expect['snapshots'].append({'name': name})
                    dev_expect['snapshots_name_counter'] = num
                actions.append({
                    'type': 'blockdev-snapshot-internal-sync',
                    'data': { 'device': dev_expect['device'],
                              'name': name },
                })

        if abort == True:
            actions.append({
                'type': 'abort',
                'data': {},
            })

        result = self.vm.qmp('transaction', actions = actions)

        if abort == True:
            self.assert_qmp(result, 'error/class', 'GenericError')
        else:
            self.assert_qmp(result, 'return', {})

    def verifySnapshotInfo(self):
        result = self.vm.qmp('query-block')

        # Verify each expected result
        for dev_expect in self.expect:
            # 1. Find the returned image value and snapshot info
            image_result = None
            for device in result['return']:
                if device['device'] == dev_expect['device']:
                    image_result = device['inserted']['image']
                    break
            self.assertTrue(image_result != None)
            # Do not consider zero snapshot case now
            sn_list_result = image_result['snapshots']
            sn_list_expect = dev_expect['snapshots']

            # 2. Verify it with expect
            self.assertTrue(len(sn_list_result) == len(sn_list_expect))

            for sn_expect in sn_list_expect:
                sn_result = None
                for sn in sn_list_result:
                    if sn_expect['name'] == sn['name']:
                        sn_result = sn
                        break
                self.assertTrue(sn_result != None)
                # Fill in the detail info
                sn_expect.update(sn_result)

    def deleteSnapshot(self, device, id = None, name = None):
        sn_list_expect = None
        sn_expect = None

        self.assertTrue(id != None or name != None)

        # Fill in the detail info include ID
        self.verifySnapshotInfo()

        #find the expected snapshot list
        for dev_expect in self.expect:
            if dev_expect['device'] == device:
                sn_list_expect = dev_expect['snapshots']
                break
        self.assertTrue(sn_list_expect != None)

        if id != None and name != None:
            for sn in sn_list_expect:
                if sn['id'] == id and sn['name'] == name:
                    sn_expect = sn
                    result = \
                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
                                      device = device,
                                      id = id,
                                      name = name)
                    break
        elif id != None:
            for sn in sn_list_expect:
                if sn['id'] == id:
                    sn_expect = sn
                    result = \
                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
                                      device = device,
                                      id = id)
                    break
        else:
            for sn in sn_list_expect:
                if sn['name'] == name:
                    sn_expect = sn
                    result = \
                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
                                      device = device,
                                      name = name)
                    break

        self.assertTrue(sn_expect != None)

        self.assert_qmp(result, 'return', sn_expect)
        sn_list_expect.remove(sn_expect)

class TestSingleTransaction(ImageSnapshotTestCase):
    def setUp(self):
        self._setUp('test_a.img', 1)

    def test_create(self):
        self.createSnapshotInTransaction(1)
        self.verifySnapshotInfo()

    def test_error_name_empty(self):
        actions = [{'type': 'blockdev-snapshot-internal-sync',
                    'data': { 'device': self.expect[0]['device'],
                              'name': '' },
                  }]
        result = self.vm.qmp('transaction', actions = actions)
        self.assert_qmp(result, 'error/class', 'GenericError')

    def test_error_device(self):
        actions = [{'type': 'blockdev-snapshot-internal-sync',
                    'data': { 'device': 'drive_error',
                              'name': 'a' },
                  }]
        result = self.vm.qmp('transaction', actions = actions)
        self.assert_qmp(result, 'error/class', 'GenericError')

    def test_error_exist(self):
        self.createSnapshotInTransaction(1)
        self.verifySnapshotInfo()
        actions = [{'type': 'blockdev-snapshot-internal-sync',
                    'data': { 'device': self.expect[0]['device'],
                              'name': self.expect[0]['snapshots'][0] },
                  }]
        result = self.vm.qmp('transaction', actions = actions)
        self.assert_qmp(result, 'error/class', 'GenericError')

class TestMultipleTransaction(ImageSnapshotTestCase):
    def setUp(self):
        self._setUp('test_b.img', 2)

    def test_create(self):
        self.createSnapshotInTransaction(3)
        self.verifySnapshotInfo()

    def test_abort(self):
        self.createSnapshotInTransaction(2)
        self.verifySnapshotInfo()
        self.createSnapshotInTransaction(3, abort = True)
        self.verifySnapshotInfo()

class TestSnapshotDelete(ImageSnapshotTestCase):
    def setUp(self):
        self._setUp('test_c.img', 1)

    def test_delete_with_id(self):
        self.createSnapshotInTransaction(2)
        self.verifySnapshotInfo()
        self.deleteSnapshot(self.expect[0]['device'],
                            id = self.expect[0]['snapshots'][0]['id'])
        self.verifySnapshotInfo()

    def test_delete_with_name(self):
        self.createSnapshotInTransaction(3)
        self.verifySnapshotInfo()
        self.deleteSnapshot(self.expect[0]['device'],
                            name = self.expect[0]['snapshots'][1]['name'])
        self.verifySnapshotInfo()

    def test_delete_with_id_and_name(self):
        self.createSnapshotInTransaction(4)
        self.verifySnapshotInfo()
        self.deleteSnapshot(self.expect[0]['device'],
                            id = self.expect[0]['snapshots'][2]['id'],
                            name = self.expect[0]['snapshots'][2]['name'])
        self.verifySnapshotInfo()


    def test_error_device(self):
        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
                              device = 'drive_error',
                              id = '0')
        self.assert_qmp(result, 'error/class', 'GenericError')

    def test_error_no_id_and_name(self):
        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
                              device = self.expect[0]['device'])
        self.assert_qmp(result, 'error/class', 'GenericError')

    def test_error_snapshot_not_exist(self):
        self.createSnapshotInTransaction(2)
        self.verifySnapshotInfo()
        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
                              device = self.expect[0]['device'],
                              id = self.expect[0]['snapshots'][0]['id'],
                              name = self.expect[0]['snapshots'][1]['name'])
        self.assert_qmp(result, 'error/class', 'GenericError')

if __name__ == '__main__':
    iotests.main(supported_fmts=['qcow2'],
                 supported_protocols=['file'])
